Distributed System — Appunti TiTilda

Indice

Introduction

A distributed system is a collection of independent computer that appears to the users as a single computer.

Some key aspects of distributed systems are:

Distributed System Architecture

High-Level Architecture

A distributed system is composed of multiple machines connected through a network. Those machines can be:

Runtime Architecture

A distributed system can be classified based on the components, the connections, the data exchange, and the interaction between them.

Client-Server

This is the most common architecture.

It is based on two component:

In distributed system one server can use a service from other servers creating a multi-tier (N-tier) infrastructure.

The services that can be distributed are:

This architecture is easy to manage and scale, but the server is a single point of failure and a bottleneck for the system.

Service Oriented

The service oriented architecture is based on three components:

The interface is described with a standard language (e.g. WSDL).

The communication is done through a standard protocol (e.g. SOAP).

REST

REpresentional State Transfer (REST) is an architectural style that define how web standards should be used.

The interaction happen between a client and a server. The interaction is stateless, meaning that each request from the client to the server must contain all the information needed to understand and process the request.

The response must be explicitly labeled as cacheable or non-cacheable.

The server interface must satisfy four constraints:

  1. Identification of resources: Each resource is identified by a unique URI;
  2. Manipulation of resources through representations: The communication is done through representations of the resource (e.g. JSON, XML, HTML), selected dynamically based on the client needs;
  3. Self-descriptive messages: Each message contains metadata about how to process it (e.g. HTTP headers);
  4. Hypermedia as the engine of application state: The server provides links to other resources dynamically, allowing clients to navigate the application state.

Peer-to-Peer

All the components in a peer-to-peer architecture are equal and can act as both client and server.

A server, in a client-server architecture, represents a single point of failure and a bottleneck for the system. In a peer-to-peer architecture, the failure of a single node does not affect the overall system, as other nodes can take over its responsibilities.

Object-Oriented

An object-oriented architecture is based on the concept of objects that encapsulate data and behavior. The objects provide an interface to interact with them.

Data-Centred

A data-centered architecture is based on a shared data space (Repository - tuple space or shared global state) that is accessible by all the components in the system.

Data can be added or read from the repository.

When performing a read operation, the component can either take from the repository (destructive read) or read the data without removing it (non-destructive read).

The components provide a pattern (or template) to access the data. If there is no match the component will wait until there is one.

Event-Based

An event-based architecture is based on the concept of events that are generated by components in the system.

Other components can subscribe to events and be notified when an event occurs.

Mobile Code

The Mobile Code architectural style allows for the dynamic movement of code or an entire running process across the network at runtime. This capability enhances a system’s flexibility, enabling it to receive new functionality without being halted or manually updated.

This style is often implemented in languages that run on a virtual machine (VM), such as Java or JavaScript, because the VM provides a crucial layer of abstraction and control over the executing environment.

The ability to receive and execute code from an external source is a major security risk.

Code on Demand

The client retrieves the executable code (or script) from a server and executes it locally.

Only the code itself moves (Weak Mobility).

An example is a web browser that downloads and executes JavaScript code from a web server.

Remote Evaluation

The client sends a request that is executed as code on the server.

The server performs the computation and returns only the result to the client (Weak Mobility).

An example is an SQL query that is sent to a database server for execution, and the results are returned to the client, or a cloud notebook like Google Colab where the code is executed on a remote server.

Mobile Agent

A process, including both its code and its complete execution state (data, program counter, stack), is suspended, migrated to a new machine, and then resumed (Strong Mobility).

Interactions

The behavior of distributed system is determined by a distributed algorithm: an algorithm executed collaboratively across multiple, independent machines. Due to the distribution, the algorithm must handle communication, synchronization, and fault tolerance.

A critical component of the performance and behavior of a distributed algorithm is the time: speed of the processes, the performance of the communication channel, and the clock drift (the time difference between the different machines) rates between machines.

There are two types of distributed systems based on time:

Pepperland Example

There are two generals on top of two hills. They need to sync who will lead an assault and when to start it. They can communicate on a reliable channel.

To choose the leader they could both choose a random number, the bigger on win.

In a async system is impossible to choose a time to charge as there is no bound on the time. One general could send a message to the other, but there is no guarantee that the message will arrive in a specific time. The other general could wait for the message, but there is no guarantee that it will arrive. The only solution is to charge immediately, but this could lead to a failure if the other general doesn’t charge at the same time.

Communication

Communication protocols define how processes in a distributed system exchange information.

These protocols are primarily classified along two dimensions:

Remote Procedure Call (RPC)

Remote Procedure Call (RPC) is a communication abstraction that allows a program to execute a procedure or function in a different machine as if it were a local call (Location Transparency).

The RPC is build upon a middleware layer that hides the network communication.

This is done using a proxy called Stub that is responsible to:

The stubs are defined with an Interface Definition Language (IDL) that is language independent, allowing to have different languages between the caller and the callee.

RPC Communication

The communication is done by passing parameters to the remote procedure.

The standard method is to pass parameters by value, as there is no shared memory between the two machines.

If there is a need to pass parameters by reference, it’s possible to simulate it using the copy/restore method: the value is passed by value, but when the function ends, the return value of the parameters is cloned in the caller.

The RPC communication is typically synchronous (blocking), to mimic the local procedure call behavior, but it can be also:

The communication can be optimize with two strategies:

RPC Middleware

The client needs to locate the server that provide the requested service.

This is done through two logics:

To conserve resources the server can be Dynamically Activated the server once a request arrive.

Inter-Process communication (IPC)

When two distinct process are on the same machine, it would be possible to use RPC to communicate, but it would be inefficient.

Using a Lightweight RPC that use a shared memory accessible from the middleware.

Remote Method Invocation (RMI)

Remote Method Invocation (RMI) is the object-oriented counterpart to RPC. It allows a process to invoke a method on a remote object residing in another process.

The communication is done through method calls on remote objects (Stub) that acts as if they were local. This is done because it’s not possible to pass objects by value between two different machines that might use different programming languages.

In java RMI, as both the client and the server use the same language, it’s possible to pass objects, but the code must be available or downloaded dynamically.

Message Oriented Communication (MOC)

Message Oriented Communication is a foundational style in distributed systems, where interaction is based on the asynchronous exchange of discrete, self-contained messages. It is generally a one-way interaction, often based on events.

Basic Message Passing

The most fundamental form of MOC uses low-level socket programming, relying on the Operating System primitives for communication.

Message Passing Interface (MPI)

Message Passing Interface (MPI) is a library that provides a rich set of primitives to manage communication in parallel and distributed systems.

It allows to control the synchronization level of the communication and avoids explicit serialization of data.

Messages can be sent in broadcast or scattered between multiple processes to perform parallel computation.

Message Queueing

Message Queueing is a system that introduces a persistent storage between the client and the server (queue) to store messages.

This allows to decouple the client and the server in time and space.

The client push messages in the queue and the server pop messages from the queue asynchronously and they act as peers.

graph TD
  A[Client] -->|Pushes Message| B[Message Queue]
  B -->|Pops Message| C[Server]

Inside the system there could be multiple queues identified by a name that can be statically or dynamically created.

This decoupling allows to scale the system easily by adding more servers to process the incoming messages.

In the system there could be managers that act as relay between multiple queues to create complex topologies, increasing the fault tolerance.

Publish-Subscribe

The publish-subscribe model is an event-driven architecture where publishers generate events/messages without knowing who the receivers are, and subscribers express interest in events without knowing who published them.

This allows to decouple the event with the space but the communication is transient as only online subscribers will receive the messages.

graph TD
  A[Publisher] -->|Publishes Event| B[Event Broker]
  B -->|Notifies| C[Subscriber 1]
  B -->|Notifies| D[Subscriber 2]
  B -->|Notifies| E[Subscriber 3]
  F[Publisher 2] -->|Publishes Event| B

A component can subscribe based on:

The core component of this architecture is the Event Dispatcher (broker) that manage the subscriptions and the notifications.

A Complex Event Processing system is often layered on top of the dispatcher. It analyzes streams of incoming simple events to detect patterns, correlations, or anomalies, and then generates a single, higher-level complex event (e.g., detecting “Fire” from simple events like “Smoke detected” and “High Temperature reading”).

Distributed Publish-Subscribe

To overcome the Event Dispatcher bottleneck, a distributed architecture is organized into a network of message brokers and the message is forwarded across the network using different strategies.

For acyclic graphs:

Paths can be optimized if some are a subset of others.

Cyclic topologies are more fault-tolerant but introduce the problem of message loops (flooding) and uncertainty about delivery paths.

Stream Oriented communication

Stream-Oriented Communication involves transmitting a continuous, ordered sequence of data from a source to a sink. This model is essential for multimedia and real-time applications where the timing of the data arrival is often as critical as its content.

Timing Constraints

In streaming, the correctness of the communication is heavily impacted by time, leading to three classifications based on timing guarantees:

Quality of Service (QoS)

The network doesn’t guarantee the Quality of the service. Some metrics that need management:

Streaming is implemented using UDP instead of TCP. This is because, for real-time data, a retransmitted packet is useless, as it arrives too late for playback. UDP’s speed and lack of automatic retransmission make it suitable.

The QoS are managed client and server side using different techniques:

Multiple Streams Synchronization

Is possible that multiple streams need to be synchronized (e.g., audio and video). This can be done:

Naming

In a distributed system, naming is the mechanism used to reference and locate system entities, which can range from physical hosts, files, and services to users and abstract processes.

Names can be human-friendly (e.g., “www.example.com”) or machine-friendly (e.g., IP addresses).

A name can also be global (unique across the entire system) or local (unique within a specific context or domain).

The address is the actual location of the entity in the network (e.g., an IP address) and it can be mutable and change over time.

An entity must be identified by an immutable Identifier and a using a name resolution to convert that name to an address.

Name resolution can be performed in different ways:

Flat Naming

A name is flat if it is a simple sequence of characters that contains no structural or topological information about the entity’s location.

Simple Solution

This method is suitable only for small-scale, local area environments where network traffic is manageable.

To locate an entity with a specific name, a request is broadcasted to all hosts on the local network segment. The host that recognizes the name responds with its address.

An example is ARP, that broadcasts a request to find the MAC address associated with an IP address.

Home Based

This strategy handles mobility by having a fixed home address for each entity.

When an entity moves to a new location, it registers its new address with its home server.

When another entity wants to communicate with it, it first contacts the home server that returns a Forwarding Pointer (the current address) allowing direct communication.

This adds an extra step (the trip to the Home Host) to every connection setup, increasing latency.

sequenceDiagram
  participant EntityA as Entity A
  participant HomeServer as Home Server
  participant EntityB as Entity B

  EntityA->>HomeServer: Register new address (after moving)
  EntityB->>HomeServer: Query for Entity A's address
  HomeServer-->>EntityB: Return forwarding pointer (current address)
  EntityB->>EntityA: Communicate directly using forwarding pointer

Distributed Hash Table

DHTs create a scalable, decentralized system for mapping flat names (keys) to addresses (values) across thousands of nodes.

Each node in the DHT is assigned a unique identifier (ID) from the same address space as the keys. Than the key space is partitioned among the nodes based on the hash of the key (the first node with the id greater than the key is responsible for that key).

The nodes form an overlay network, organized as a logical ring.

Finding the node responsible for a key can be done with different strategies:

Hierarchical

Hierarchical distribution organizes names into a tree-like structure, where each subtree represents a directory and entities (such as files or services) are leaves.

In this model, each directory node maintains information about its children and parent. When resolving a name, the process follows these steps:

Once resolved, the address is returned along the path, and intermediate nodes can cache the result for future queries to improve performance.

This approach excels in local domains, where queries are confined to nearby branches, reducing global network traffic. However, it can suffer from bottlenecks at higher-level nodes if not balanced properly.

Structured Naming

Structured Naming organizes system entities into a Name Space, a logical structure. Unlike flat naming, the name itself carries information about the entity’s position within the structure.

A name space is a labeled, graph composed of:

An entity is uniquely identified by a Path Name, a sequence of directory node labels that traces a route from the root to the leaf node.

Inside the name space, there can be:

An example of structured name is the filesystem.

graph TD
  A[root] --> B[home]
  A --> C[etc]
  A --> D[var]
  B --> E[user1]
  B --> F[user2]
  E --> G[documents]
  E --> H[pictures]
  G --> I[file1.txt]
  G --> J[file2.txt]
  H --> K[image1.jpg]
  F --> L[downloads]
  L --> M[app.exe]

Partitioning

Thanks to the structured nature of the name space, it’s possible to partition the name space into layers. Each host can know only part of the names.

Resolution

The resolution can happen with two methods:

sequenceDiagram
  participant Client
  participant Root Server
  participant Intermediate Server
  participant Leaf Server

  Client->>Root Server: Query for name resolution
  Root Server-->>Client: Referral to Intermediate Server
  Client->>Intermediate Server: Query for name resolution
  Intermediate Server-->>Client: Referral to Leaf Server
  Client->>Leaf Server: Query for name resolution
  Leaf Server-->>Client: Resolved address
sequenceDiagram
  participant Client
  participant Root Server
  participant Intermediate Server
  participant Leaf Server

  Client->>Root Server: Query for name resolution
  Root Server->>Intermediate Server: Query for name resolution
  Intermediate Server->>Leaf Server: Query for name resolution
  Leaf Server-->>Intermediate Server: Resolved address
  Intermediate Server-->>Root Server: Resolved address
  Root Server-->>Client: Resolved address

Higher-level directory nodes change infrequently, so their resolution results can be cached for long durations (high TTL). Lower-level nodes must update immediately for system changes, requiring short TTLs.

Root servers are heavily mirrored worldwide and often share the same IP address. IP Anycast routes a request to the nearest operational server, distributing the load and improving global response time.

Structured vs flat name

DNS

An example of distributed name system is the DNS.

The DNS nodes could be of different types:

The DNS doesn’t work well with mobility.

Attribute Based Naming

Attribute-Based Naming allows entities to be located using a set of descriptive attributes (properties) instead of, or in addition to, their fixed names.

This approach is essential for large, complex systems where users often search for entities based on their characteristics. These systems are commonly referred to as Directory Services and are implemented using DBMS technologies.

The entire collection of entities and attributes is called the Directory Information Base (DIB).

The entities are organized in a hierarchical structure known as the Directory Information Tree (DIT), similar to structured naming.

It’s possible to perform two operations:

LDAP

An example of attribute based naming system is LDAP (Lightweight Directory Access Protocol) where each entity is identified by a Distinguished Name (DN).

An example is: cn=John Smith,ou=Engineering,o=ExampleCorp,c=US

Where:

Remove Unreferenced Entities

Distributed Garbage Collection is the process of automatically identifying and reclaiming memory or resources occupied by remote objects (skeletons) that are no longer referenced by any active proxy in the distributed system.

Reference count

Each remote object’s skeleton maintains a counter representing the total number of active proxies (clients) that hold a reference to it. When the count reaches zero, the object is eligible for collection.

Some problems are:

To fix the race condition the proxy A should send a message to the skeleton O to notify the transfer of the reference to proxy B. Than O should send an ack to B to confirm the transfer.

Weighted Reference Counting

To reduce the number of messages needed to manage the reference count, a weighted reference counting scheme can be used.

Each skeleton starts with a total weight (e.g., 256). When a proxy connects to the skeleton, it receives half of the current weight.

When a proxy disconnects, it returns its weight back to the skeleton.

When a proxy sends a reference to another proxy, it sends half of its current weight along with the reference.

This remove the problem of increasing the amount of messages, but limit the maximum amount of proxies that can link to a skeleton.

Reference Listing

The skeleton maintains a complete list of all proxies that currently hold a reference.

When a proxy disconnects, it sends a message to the skeleton to remove itself from the list.

The skeleton can periodically check the liveness of each proxy in the list (e.g., sending heartbeat messages). If a proxy is found to be unresponsive, it is removed from the list.

When the list becomes empty, the skeleton can be safely collected.

This approach is robust against non-reliable channels, but the still is the problem of race conditions when transferring references.

distribute mark-and-sweep

To detect disconnected entities from the root, a mark-and-sweep algorithm can be used.

in a centralized system, the garbage collector starts from the root and marks all reachable objects. Then it sweeps through all objects and deletes those that are unmarked.

In a distributed system, each node performs the mark-and-sweep algorithm locally. The process involves:

Synchronization

Synchronization is essential for ordering events and coordinating actions across multiple nodes. Each node relies on an imperfect local physical clock (a quartz crystal oscillator) which inevitably suffers from clock drift (a deviation from the true time, about 1 second every 11.6 days).

To manage this, clocks must be periodically synchronized to ensure consistency across the distributed system, by:

When a clock correction is needed, simply jumping the time backward can cause major issues. To avoid this it should maintain monotonicity by slowing down or speeding up the clock until it reaches the correct time.

Physical Clock

Physical clocks provide a way to measure real-world time, but due to clock drift, they need to be synchronized periodically.

Global Positioning System (GPS)

GPS provides an accurate way to synchronize clocks across the globe.

Each GPS satellite uses atomic clock to count time and they are sync with each other. They periodically broadcast their current timestamp.

A receiver on the ground should receive the timestamp from at least 4 satellites to calculate its position and the current time.

Since the signal travel at the speed of light, the receiver can calculate the time it took for the signal to arrive and adjust its clock accordingly with a precision of a few nanoseconds, even with a position error of 10 meters.

Cristian’s Algorithm

In a network there is a time server that provide the right time and clients can request it periodically.

The client send a request at time T_0 and receive the response at time T_1 with the server time T_s inside.

Then the client set its clock to T_s + \frac{(T_1 - T_0)}{2}, this is done to adjust the time considering the transmission delay.

This approach assumes that the network delay is symmetric and the processing time on the server is negligible or known.

Berkeley’s Algorithm

This algorithm is used when there is no accurate time server and all the machines need to sync among themselves (agreement).

A master node periodically polls all the other nodes for their current time and calculates the average time.

With this average time, the master node calculates the offset for each node (including itself) and sends the offset to each node. Each node adjusts its clock by the received offset.

Network Time Protocol (NTP)

NTP is the internet standard for clock synchronization.

It uses a hierarchical system of time sources, with stratum 0 being high-precision timekeeping devices (like atomic clocks), stratum 1 being directly connected to stratum 0 devices, and so on.

Each stratum synchronizes with the stratum above it, reducing the precision as the stratum increases.

The synchronization can be done in broadcast, with procedure-call or symmetric mode.

NTP uses four timestamps collected over two pairs of messages to accurately calculate the offset (the difference between clocks) and the delay (round-trip time), minimizing the impact of asymmetric latency:

The offset (\theta) and delay (\delta) are calculated as:

\theta = \frac{(T_2 - T_1) + (T_3 - T_4)}{2}

\delta = (T_4 - T_1) - (T_3 - T_2)

Logical Clock

In a distributed system, there is no global clock to order events and physical clocks are unreliable. To solve this problem, logical clocks are used to provide a partial ordering of events based on their causality.

They rely on the happened-before relation (\rightarrow):

This relation creates a partial ordering of events in the distributed system. Partial ordering means that not all events can be compared; some events may be concurrent and have no defined order.

Scalar Clock

Scalar Clocks (or Lamport Timestamps) are the simplest logical clocks.

Each process maintains a local scalar clock L_i that starts at 0.

When an event e occurs, it is assigned a timestamp L(e) based on the local clock and the clock is increased.

When a process receives an event from another process, it sets its own clock to: max(current, received) + 1.

To have a global clock, need to add the id of the process to the clock (clock.id).

To force an order between multiple processes (in a reliable FIFO channel) is possible to:

Vector Clock

Vector Clocks solve the limitation of scalar clocks by using a vector of integers to capture the full causality history. This allows them to correctly capture both causality and concurrency.

Each process P_i maintains a vector V_i of size N (the total number of processes) where:

This create an ordering:

This guarantees that e→e' \Leftrightarrow V(e)→V(e')

Messages are sent with the vector clock. When receiving a message, if \forall j: V_m[j] > V_k[j] + 1 there are missing messages. For this property it’s possible to work with unreliable or non-FIFO channels.

Mutual Exclusion

Mutual Exclusion is a concurrency control mechanism that ensures that in a distributed system, a shared resource is accessed by at most one process at any given time, preventing interference and maintaining system consistency.

There are three properties that must be guaranteed:

Mutual Exclusion with Centralized server

Having a central server that handle the resource help coordination is the simplest approach.

Each process that wants to access the resource send a request to the server. The server maintain a queue of requests and grant access to one process at a time.

This is a simple approach but the server is a single point of failure and a bottleneck.

Mutual Exclusion with Token ring

Processes are organized into a logical ring topology, where each process P_i knows its successor P_{i+1}.

A single token circulates continuously around the ring. When a process wants to enter the critical section, it waits for the token to arrive. Once it receives the token, it enters the critical section, performs its operations, and then passes the token to its successor in the ring.

This approach can be inefficient as the token must circulate even when no process needs access, leading to unnecessary delays. If the token is lost, a recovery mechanism is needed to regenerate it to avoid deadlocks.

Mutual Exclusion with Scalar Clock

When a process wants to access the resource it sends a request message to all the other processes with its current scalar clock.

Once a process P_j receives the request of P_i it will act as follows:

A process can enter the critical section only after receiving an ack from all the other processes.

This approach needs 2(N-1) messages per critical section entry making it inefficient for large systems.

Leader election

Many distributed algorithm need a coordinator node to manage the execution. The coordinator is called leader and should be acknowledged by all the processes.

Typically all the nodes are distinguished by a unique id known by all the nodes.

When a node detect the crash of the leader it should start a leader election algorithm to choose a new leader, that should be the node with the greater id.

To do so the link must be reliable and should be possible to detect crashes (must introduce a bound on transmission time (problems: clock drift, transmission time, etc)).

Bully Election

When a node P detect the crash of the leader it sends an election message to all the nodes with a greater ID, starting its election.

If a node P' receive the election message and its id is greater than P it will reply to P with a message to stop P election and start its own election.

If the node P doesn’t receive any reply it means that it’s the node with the greater id and it will send a coordinator message to all the nodes to inform them that it’s the new leader.

When a node come back online it will start an election to check if it’s the node with the greater id.

Ring Based Election

Nodes are organized in a logical ring topology.

When a node detect the crash of the leader it will send an election message to it’s successor.

When a node receive the election message it will check the id inside:

When a node receive the coordinator message it will update the leader id with the one with the greater id in the message.

Collecting Global State

It’s important to collect the global state of a distributed system to perform tasks like checkpointing, debugging, and detecting deadlocks. Since there is no global clock, it’s impossible to take a snapshot of all the processes at the same time.

A Distributed snapshot represent the state of all the processes and communication channels at a specific point in time.

The snapshot must be consistent, meaning that it accurately reflects the state of the system without contradictions (e.g. P1 saves its state than sends a message to P2, and P2 receives the message before saving its own state).

The conceptual tool representing the global state is the cut. The cut is a line that intersect all the processes at a specific point in time.

A cut is consistent if for any event e that is included in the cut, all events that happened before e are also included in the cut.

Distributed Snapshot

The Chandy-Lamport Distributed Snapshot Algorithm is the standard, method for recording a globally consistent state, assuming communication links are FIFO and reliable.

The algorithm uses a special marker message to indicate the start of the snapshot process.

Any process P_i can initiate the snapshot by atomically executing the following steps:

Once a process P_j receives a Marker on an incoming channel, it will:

Once a process has received a Marker on all its incoming channels, it completes its part of the snapshot that could be sent to a collector.

It’s possible to start multiple snapshot by distinguishing the snapshot with an id.

Termination detection

The Dijkstra-Scholten algorithm is used to detect the termination of a diffusing computation, a computation where all activity starts from a single initiator and spreads by message passing.

The algorithm organizes processes into a tree structure rooted at the initiator.

When a process P_i sends a message to initialize the processing to another process P_j, if P_j is not already part of the tree, it sets P_i as its parent.

Once a child terminates and there are no children left, it informs the parent to remove it from the children list.

When the root process has no children and is passive, it declares the computation terminated.

Distributed Transaction

A Transaction is a sequence of operations (read and write) that transforms resources from one consistent state to another. In a distributed system, a Distributed Transaction is a single logical transaction whose operations are executed across multiple, independent nodes.

Look at Database 2 for more information about transactions in a single node.

There are three types of transactions:

To achieve atomicity:

Concurrency is managed by components across the distributed nodes:

Two-Phase Locking (2PL)

This algorithm guarantees serializability by requiring transactions to acquire locks on data items before access.

The transaction has two phases:

For more information about 2PL look at Database 2.

This can be implemented with:

Timestamp

This algorithm avoids deadlocks by giving each transaction T a unique timestamp (ts(T) - scalar clock) at its start.

Each record has a timestamp of the latest transaction that performed a read \text{ts}_\text{read} and a write \text{ts}_\text{write} on that record.

otherwise the operation, and transaction, is aborted.

For more information about timestamp look at Database 2.

Deadlock

Deadlocks occur when a set of transactions is cyclically waiting for resources held by others.

Deadlock can be handled in different ways:

Detection

Each machine maintain a local wait-for graph (WFG) representing the dependencies between the local transactions.

Centralized Detection

The simplest way is to have a centralized node that collect the global WFG and check for cycles periodically.

Collection can be done in different ways:

Distributed Detection

In case of distributed system without a coordinator it use the Chandy-Misra-Hass.

When a transaction T_i gets blocked waiting for a resource held by T_j, T_i initiates detection by sending a Probe Message to T_j.

T_j forwards the message to the transaction holding the resource it needs, and so on.

Once the message is received by the initiator, it’s possible to detect a loop and resolve it by:

Prevention

Prevention ensures that the WFG can never contain a cycle. This is often done using transaction timestamps to define a priority among transactions.

There are two main methods:

Wait-Die

If an older transaction T_i requests a resource held by a younger transaction T_j, T_i waits for T_j to release the resource.

If a younger transaction T_i requests a resource held by an older transaction T_j, T_i is aborted (dies) and restarted with the same timestamp.

Wound-Wait

If an older transaction T_i requests a resource held by a younger transaction T_j, T_j is aborted (wounded) and T_i acquires the resource.

If a younger transaction T_i requests a resource held by an older transaction T_j, T_i waits for T_j to release the resource.

Fault Tolerance

Fault Tolerance is the ability of a system to continue operating without interruption despite the failure of one or more of its components (a partial failure). The goal is to mask these failures from the client, ensuring the system remains dependable:

Failure Classification

There are different types of failures:

Failure Detection

In a synchronous system, it’s possible to detect a failure by setting a time bound on message delivery and process execution. In asynchronous system, it’s impossible distinguish between a slow process/channel, a failed process, and unreliable communication.

To mitigate false positive, and confirm failures, it’s possible to use heartbeats messages that are sent after a certain inactivity period and the ack indicate that the process is alive.

Fault tolerance is fundamentally achieved through redundancy:

Client Failure

When a client process (which initiated a remote computation) crashes, the server process performing the work is left without a node for the result. This process is called an Orphan Computation.

Protection against failure

Process Resilience

Process Resilience is achieved through redundancy, ensuring that the failure of a single process does not halt the computation. This involves executing the computation across multiple processes so that a backup can take over upon failure.

This can be done with two methods:

To guarantee that a system can tolerate up to k process failures (k-resilience), a minimum number of replicas is required, depending on the failure type:

The consensus problem requires that all non-faulty processes to agree on a single data value proposed by one or more processes.

The solution must satisfy three properties:

FloodSet Algorithm

The FloodSet Algorithm is a method to achieve consensus in a synchronous distributed system with omission failures.

  1. Each process has a set of proposed values. In each round, every process sends its current set of values to all other processes.
  2. Upon receiving sets from other processes, each process updates its own set by taking the union of all received sets.

To guarantee a k-resilience, these processes are repeated for k + 1 rounds to guarantee that every non-faulty process receives every value.

To solve consensus with k Byzantine failures, the number of processes must be at least N = 3k + 1.

With asynchronous systems it’s impossible to reach an agreement with even one failure.

Reliable Group Communication

A multicast communication is reliable if:

Since network channels can lose or duplicate messages, reliability is enforced using acknowledgement strategies:

To scale and reduce the amount of NACK, it’s possible to use:

Virtual Synchrony

The ideal behavior is to have a close Synchrony an instantaneous and reliable communication, such that:

This is physically impossible if there are process failures. Need to simulate with Virtual synchrony.

Virtual synchrony ensures that all non-faulty processes in a group see the same sequence of events (messages and membership changes) in the same order.

The system operates in sequential views, where a view is the current set of active, correct members. Virtual Synchrony ensures that messages sent within one view are only delivered to members of that view.

This could be implemented by:

Message Ordering

Messages can be received in three ways:

All the types can be implemented with a total order and are called Atomic.

Recover Technique

After a failure a process need to be taken back to a correct state:

Backward recovery is done with two techniques: Checkpointing and Logging.

Checkpointing

Checkpointing is the act of periodically saving the internal state of a process to stable storage. The primary challenge is ensuring that the collection of local checkpoints and messages in transit forms a Consistent Global State (a consistent cut).

Uncoordinated Checkpoints

In this approach, processes take checkpoints independently, making the protocol simple during normal execution but complex during recovery.

Since checkpoints are independent, the system must track causal dependencies introduced by message exchange (A message m sent after Checkpoint C_i by P_{\text{sender}} and received before Checkpoint C_j by P_{\text{receiver}} means C_j depends on C_i).

A single process failure and rollback can force other processes to roll back to earlier checkpoints to maintain consistency,

The recovery can happen with two algorithms:

Coordinated Checkpoint

This method introduce synchronization between the processes to take a global consistent checkpoint.

Logging

This is possible only when the system is piecewise deterministic (the execution can be reproduced).

All events are recorded to a stable log. After a crash, a process rolls back to a checkpoint and replays the log, ensuring the execution follows the same path up to the crash point.

A message is unstable if it is recorded only in volatile memory and could be lost in a crash. A message is stable if it has been written to persistent storage.

Foreach usable message there two types of relationship:

A process P_{\text{DEP}} becomes an orphan if all processes that hold a COPY of the unstable message crash. When this happens, P_{\text{DEP}} must be rolled back to a state before the dependency was created.

Logging protocols are classified based on how they handle the logging of messages:

Distributed Agreement

Commits

Atomic Commit ensure that a distributed transaction is either fully committed or fully aborted across all participating nodes, maintaining the atomicity property of transactions.

In consensus, nodes must agree on a single value, while in atomic commit, all nodes must vote to commit, otherwise the transaction is aborted.

Two Phase Commit (2PC)

The Two Phase Commit protocol is based on the election of a coordinator (that can be or not a participant of the transaction).

The client, during the transaction, communicate with all the participant nodes, once the transaction is over, the client sends a commit message to the coordinator that starts the 2PC:

  1. The coordinator sends a prepare message to all the participants;
  2. Each participant votes to commit or abort and sends the vote to the coordinator;
  3. The coordinator collects all the votes and decides to global-commit if all the votes are commit, otherwise it decides to global-abort;
  4. The coordinator sends the global decision to all the participants;
  5. Each participant performs the action and sends an ack to the coordinator;
stateDiagram-v2

  c_init: Init
  c_wait: Wait
  c_commit: Commit
  c_abort: Abort

  state coordinator {
    [*] --> c_init
    c_init --> c_wait: receive commit request
    c_wait --> c_commit: all votes are commit
    c_wait --> c_abort: at least one vote is abort
  }

  p_init: Init
  p_ready: Ready
  p_commit: Commit
  p_abort: Abort

  state participant {
    [*] --> p_init
    p_init --> p_ready: receive prepare
    p_ready --> p_commit: receive global-commit
    p_ready --> p_abort: receive global-abort
    p_init --> p_abort: receive global-abort
  }
sequenceDiagram
  participant Client
  participant Coordinator
  participant Participant1
  participant Participant2

  Client->>Coordinator: Commit Request
  Coordinator->>Participant1: Prepare
  Coordinator->>Participant2: Prepare
  Participant1-->>Coordinator: Vote Commit
  alt All votes are Commit
    Participant2-->>Coordinator: Vote Commit
    Coordinator->>Participant1: Global Commit
    Coordinator->>Participant2: Global Commit
  else At least one vote is Abort
    Participant2-->>Coordinator: Vote Abort
    Coordinator->>Participant1: Global Abort
    Coordinator->>Participant2: Global Abort
  end
  Participant1-->>Coordinator: Ack
  Participant2-->>Coordinator: Ack
  Coordinator-->>Client: Transaction result

To detect failure, and make the system synchronous, a timeout is introduced.

After the coordinator send the prepare message, it starts a timeout waiting for all the votes. If of at least one participant fails to respond, the coordinator will abort the transaction.

If the coordinator fails the participant can choose:

This protocol is Safe as it doesn’t leads to an incorrect state, but it doesn’t preserve liveness as the failure of the coordinator can block the participants indefinitely.

Three Phase Commit (3PC)

3PC improves the 2PC protocol by separating the commit into two phases pre-commit and commit to avoid blocking in case of coordinator failure.

stateDiagram-v2

  c_init: Init
  c_wait: Wait
  c_pre_commit: Pre Commit
  c_commit: Commit
  c_abort: Abort

  state coordinator {
    [*] --> c_init
    c_init --> c_wait: receive commit request
    c_wait --> c_pre_commit: all votes are commit, send pre-commit
    c_pre_commit --> c_commit: receive acks from all participants, send global-commit
    c_wait --> c_abort: at least one vote is abort
  }

  p_init: Init
  p_ready: Ready
  p_pre_commit: Pre Commit
  p_commit: Commit
  p_abort: Abort

  state participant {
    [*] --> p_init
    p_init --> p_ready: receive prepare
    p_ready --> p_pre_commit: receive pre-commit
    p_pre_commit --> p_commit: receive global-commit
    p_ready --> p_abort: receive global-abort
    p_init --> p_abort: receive global-abort
  }

If a participant fails during the init or wait state, the transaction is aborted as the coordinator cannot reach a decision.

If a participant fails during the pre-commit state, the coordinator already knows that all the participants voted to commit, so it can send the global-commit message and wait for it to recover.

If the coordinator fails, the participant can choose:

This protocol preserve liveness and safety, but it increase the time needed and expensive.

CAP Theorem

A distributed system where these is replication of data, can have only two of these properties:

If there is a partition (P) the system cannot have perfect C and A.

Replicated State Machines

The Replicated State Machine approach is a primary method for achieving fault tolerance in distributed systems. It structures the system as a collection of identical, deterministic servers (nodes) that maintain the same internal state.

The key idea is that if all nodes start in the same initial state and execute the exact same sequence of operations in the exact same order, they will end up in the exact same final state. This makes the entire collection of nodes appear to the client as a single, highly available machine.

The nodes can change the state machine iff all the replication agree on the state.

Under the assumption of crash failures (no Byzantine behavior) and deterministic processes,it guarantee:

Raft

Raft is a consensus algorithm designed to be easier to understand and implement than its predecessor, Paxos.

Raft uses a leader that is responsible for:

The leader periodically sends to all the nodes, requests from the client or keep alive messages.

When a client send a command to a follower, it redirect the client to the leader.

Leader Election

Raft defines three states for a node: Follower, Candidate, and Leader.

Raft divides time into numbered, sequential Terms. Each Term begins with an election, and if successful, one Leader serves for the rest of that Term.

All the nodes start as followers. If a follower doesn’t receive any message from the leader within a timeout, it becomes a candidate, increasing the term and starting an election.

The candidate will send a message to all the nodes requesting votes.

A node will vote for the first candidate, with an up-to-date log, that request the vote in a term, and it will reset its timeout.

If a candidate receives votes from the majority of the nodes, it becomes the leader for that term.

Consistency Check

When the leader sends Append Entries messages it includes the entry that precede the new one. The nodes must agree with the append entry, otherwise it rejects the request and the leader resend with the previous log, until a consistency is found than it will update its log.

Raft Architecture

The structure is typically to have the majority nearby to decrease the delay for the round trip time. There should be other nodes for disaster recovery but that are not necessary for the computation.

To have a k-fault tolerant system, there should be at least 2k + 1 nodes, while for byzantine failure there should be at least 3k + 1 nodes.

Blockchain

A Blockchain is a distributed, decentralized, and immutable ledger (log) that utilizes cryptographic primitives and a consensus mechanism (like Proof-of-Work) to achieve Byzantine Fault Tolerance (BFT) without knowing the number of nodes or trusting any single entity.

To add a new block (commit a set of transactions), nodes (miners) must compete to solve a difficult, cryptographic puzzle (mining).

The problem requires significant computational force to find a solution. The difficulty automatically adjusts to maintain a target commit frequency which is kept low to increase security.

Once a solution is found, the miner broadcasts the new block to the network. Other nodes verify the block’s validity (simple process) and add it to their local copy of the blockchain.

The puzzle links the current transaction data with the cryptographic hash of the previous block, making the chain immutable and securing the history of the ledger.

The longest valid chain is considered the authoritative version of the blockchain, ensuring consistency across the distributed network.

Replication

Replication is the strategy of storing copies of data on multiple machines (replicas) in a distributed system.

Replication provides several key benefits:

Most replication challenges consist of managing write conflicts (when two clients try to update the same data item simultaneously).

A Consistency Model is a contract between the processes and data store:

Some example of models are:

Single Leader Consistency Protocols

A single replica is designated as the Leader, which is the only node responsible for all write operations. Followers handle read operations and receive updates from the Leader.

This protocol avoid conflicts as all the writes-write (read-write can still happen) are concentrated into a single node.

The updates from the leader to the followers can be done in different ways:

It’s possible to partition the protocol and give a leader for each partition to distribute the load.

Multi Leader Protocol

In this protocol there are multiple leaders that can handle write operations.

In this case the write-write conflict can happen, but the application should be able to easily handle them.

The workload is balanced between multiple nodes.

Leaderless Protocol

In this protocol there is no leader, all the nodes can handle read and write operations.

The client, or a proxy, is responsible for contacting multiple replicas to perform read and write operations.

This protocol use quorum based techniques to ensure consistency.

Data-Centric Consistency Model

Data-Centric Consistency Models define the rules for how updates to replicated data are ordered and observed by all client processes. These models represent a critical trade-off: stronger consistency (simpler programming) comes at the cost of higher latency and lower availability.

Based on the strength of the consistency guarantees, data-centric models can be classified into several categories:

Eventual Consistency

The Eventual Consistency model doesn’t guarantee anything about the order of the operations, it only guarantees the execution of the operations.

This is good for read-heavy systems with few concurrent updates or there are almost no concurrent updates, on append-only data structures, or if the application is commutative semantic allowing to send operations instead of values.

This is very easy to implement and with almost no overhead.

FIFO Consistency

The First-In-First-Out (FIFO) consistency model ensures that write operations done by a single process are perceived in the same order by all the other processes. Write operations from other processes can happen at any time.

This can be easily implemented by tagging each write operation with a unique scalar clock for each process.

When a replica receives a write operation, it checks the sequence number to ensure that writes from the same process are applied in the correct order. If a write operation arrives out of order, the replica can buffer it until the missing operations are received.

An example is a chat application messages sent by a user should be displayed in the order they were sent.

Causal Consistency

The Causal Consistency model ensures that all processes observe causally related write operations in the same order.

If a process P_1 performs a write operation W_1 and then another process P_2 reads the value written by W_1 and performs a write operation W_2, then all processes must observe W_1 before W_2.

This can be implemented by tracking the causal relationships between operations using vector clocks. A write operation is only applied at a replica if all causally preceding operations have been applied.

To work correctly, the clients must be sticky to a single replica, otherwise it can happen that a client read a value that doesn’t include its previous write.

Sequential Consistency

The Sequential Consistency model ensures that the results of operations are consistent with some sequential order of execution across all processes.

Sequential consistency is expensive, even programming languages (java, c++, etc), doesn’t support sequential consistency (value can be different inside core’s cache) as cache coherence is expensive, but can be forced with synchronization.

The read operation can interleave with each other.

This can be solved using two, non-highly available, protocols:

Sequential Consistency with Single Leader

Assumption: no failure, fifo network, processes are sticky (processes communicate with the same replica)

All writes go to a Leader, which totally orders them and propagates changes to all the replicas.

Sequential Consistency without Leaders

Uses a quorum system to force a consistent global ordering.

There are some fixed value for the amount of nodes to contact for reading (NR) and the amount of nodes to contact for writing (NW). The amount should follow these rules:

In this way, reading from NR promise that at least one of the has the newer value, identifiable by the timestamp.

If the \text{NW} is less than half of the total nodes, that, as the time is scalar, can happen that some have the same time, but different values.

The consistency can be maintained by two protocols:

Linearizability

The Linearizability model ensures that all operations appear to occur instantaneously at some point between their invocation and their response.

This is the strongest consistency model, as it guarantees real-time ordering of operations.

All write operation are handled by a single leader, but when it propagate the changes, the replicas lock the resource, preventing reading the new value until the update is complete across all the replicas.

Once the leader receive all the ack from the replica, it send an unlock to all the replica.

Client Centric Consistency Model

A client centric model guarantees some properties from the point of view of a single client, regardless of the replica it is connected to.

Client models can have four properties that must apply regardless of the replica:

Each operation has an unique ID that the client stores, the server are stateless.

When connecting to a replica, the client send the id of the last operation performed by him. If the replica didn’t already receive that operation, the replica wait to respond until receiving the latest data.

Guarantee this properties allows to have a casual consistency, moving some of the complexity from the server to the client.

Design Strategies

Designing a distributed system with replicated data involves making strategic choices that balance fault tolerance, performance, and consistency.

Replica placement

The placement of replicas can significantly impact the system’s performance and fault tolerance. Some common strategies include:

Update Propagation

When a write operation occurs on a primary replica, the system must decide what information to send to the other replicas to synchronize their states. The choices include:

Distributed Data Processing

Distributed Data Processing enables the efficient processing of large volumes of data (that cannot be stored on a single node) by partitioning the data and distributing the computation across distributed nodes.

It’s also possible to separate a problem into multiple sub-problems and use the output as input for the next problem.

Map Reduce

Map-Reduce is a programming model for processing large data sets in a distributed system, developed by Google.

The model has a master node that coordinates the work and multiple worker nodes that perform the tasks.

There are two types of tasks:

  1. The input data, typically stored in a distributed file system with a pagination of 64MB, is split into M partitions.
  2. The master handle the scheduling, assigns each partition to a free worker node (based on data locality) to perform the map task.
  3. Each worker processes its assigned partition, producing intermediate <key, value> pairs.
  4. The intermediate data is grouped by key.
  5. The master assigns reduce tasks to worker nodes, each responsible for processing a subset of the keys.
  6. Each reduce worker processes its assigned keys and their associated values, producing the final output.

Map-Reduce Fault Tolerance

Since both Map and Reduce operations are deterministic and stateless (they only rely on input data), failure recovery is simple: if a Worker fails, the Master simply re-assigns the task to another free Worker.

The data is stored in a distributed file system that replicates the data across multiple nodes, ensuring that if a node fails, the data can still be accessed from another replica (two locally and one remotely).

The Master monitors completion times. If a task is running slower than its peers (Stragglers), the Master executes a duplicate of that task on another free Worker. The result from the first Worker to finish is accepted, and the other execution is discarded.

Dataflow Platform

This programming model is an improvement of the MapReduce paradigm computation is modeled as a DAG, where the nodes represent operations (like Map, Filter, Join) and the edges represent the flow of data between them.

Dataflow Scheduling

This model focuses on processing bounded datasets (batches) and until the entire dataset is processed, the next operation cannot start. An example is Apache Spark.

The output of an operation is stored in an external storage (like a distributed file system) before being used as input for the next operation.

The computation is divided into multiple Tasks that can be grouped into Stages if there is no shuffle of data between them.

Scheduling gives some advantage:

The disadvantages of the scheduling are:

Dataflow Pipeline

This model focuses on low-latency processing of unbounded datasets (streams) where each operation can start processing data as soon as it becomes available from the previous operation. An example is Apache Flink.

The output of one task is directly piped to the input of the next task via in-memory data structures or network protocols (like TCP). Data is processed tuple-by-tuple or in small buffers, and there is no required intermediate step to stable external storage.

All the resources are statically allocated at the beginning of the computation.

The advantages of pipeline are:

The disadvantages are:

Stream Processing

In stream processing, data is treated as a continuous flow rather than a static dataset. This model is characterized by low-latency requirements.

Since a data stream is theoretically infinite, processing is done over finite segments called windows. A window isolates a portion of the stream and defines when a computation should be performed.

The window can be defined in two ways:

Some parameter are:

The relationship between the window’s size and its slide determines the window type:

Stream Time

The time of the data is important and can vary between the generation of the data and the processing time leading to:

When using event time, there is a need to handle out-of-order data (data that arrives late). This can be done by keeping the window open until a watermark (a message that indicates no more events with a timestamp earlier than the watermark will arrive) is reached.

Stream Implementation

Using Dataflow it’s possible to implement stream with both scheduling and pipeline.

Peer-To-Peer

Peer-to-Peer (P2P) is a decentralized communication paradigm where resources are distributed and shared among participants (peers) at the edge of the network, rather than relying solely on centralized servers.

This architecture is in contrast with the Client-Server model, which suffers from poor scalability (central server becomes a bottleneck) and high vulnerability (Single Point of Failure).

P2P allows to share different resources: Network Bandwidth, Processing Cycle, Storage space, Data.

All nodes act as both users (clients) and providers (servers) of services and resources.

Nodes are inherently unreliable and unpredictable as they can join and leave the network at any time.

Nodes are connected via virtual links that form a logical Overlay Network, which is independent of the underlying physical network topology.

Retrieving resources is performed by two operations:

This system (e.g. Napster) uses a central server that maps the metadata of the resources to the peers that store them.

When a peer joins the network it contacts the central server to register its resources.

To retrieve resources the peer contacts the central server with a query, that responds with the location of the peer that stores the resource. Than the peer contacts directly the other peer to retrieve the resource.

The presence of a central server allows to have a fast search operation (O(1)), but the server is a single point of failure and a bottleneck for scalability.

Query Flooding

This system (e.g. Gnutella) uses a fully decentralized approach where each node is equal and is linked to some neighbors.

To enter the network, a node must know at least one node already in the network, then it connects and send it a ping message that will be flooded across the network. If a node receive a ping it might reply with a pong message to offer a link to the new node, increasing the resilience.

To retrieve resources each node maintains a local index of the resources it stores.

When a node wants to search a resource it sends a query to its neighbor that is flooded across the network (or TTL expires). If a node has the resource it will send a message to the node.

The search scope is O(n) and the search time is O(2d) and there is no guarantees to find inside the scope the resource.

This is a fully decentralized approach, but it doesn’t scale well as the amount of messages grow with the amount of nodes.

Hierarchical Topology

This system (e.g. Kazaa) combines the efficiency of centralization with the resilience of decentralization by introducing super nodes.

Super nodes are peers with higher capabilities (bandwidth, storage, uptime). They communicate between each other to form a backbone of the network.

When a peer wants to publish its resources, it contacts a super node to register them.

To retrieve resources, a peer sends a query to its super node that communicates with other super nodes to find the resource.

Collaborative System

Collaborative systems (e.g. BitTorrent) incentivize sharing resources among peers by improving performance for those who contribute.

Once a peer joins the network it contacts a tracker to retrieve a list of peers that are sharing the resource (.torrent files), external to the system.

Once the peer has the list of peers it contacts them to download chunks of the file concurrently, becoming a leecher.

Once the peer has downloaded some chunks it can start sharing them with other peers. The peers starts downloading the rarest chunks first to increase its availability.

Once all the chunks are downloaded the file is reassembled and the peer becomes a seeder of that resource.

While uploading chunks to other peers, the peer evaluates the connection speed of each peer. If the peers is not uploading the connection can be choked to favor other peers that contribute more to the network.

This evaluation is performed each 10 seconds and a connection can be optimistically unchoked to give a chance to new peers.

Secure Storage

Secure Storage systems (e.g. FreeNet) provide a distributed storage mechanism that ensures data confidentiality, integrity, and availability across a decentralized network of peers.

Each node and resource has a unique identifier that is in the same space.

Once a resource is published it is encrypted and chunked into fixed-size blocks (making for the host impossible to access the content), than is sent to the node with the nearest id.

To access a resource a node sends a request to the node with the id nearest to the resource, that will forward the request until reaching the node that stores the resource. To read the content the peer needs to also know the decryption key.

Each node stores a routing table with the id of the resources and the pointer to the node that knows about it.

Once a resource pass through the node the table is updated. Files that are frequently accessed are also cached on the node, not only the table row.

Networks tends to create a small world, as id with similar id tends to cluster together

Structured Topology

Structured Topology systems (e.g. DHT, Chord) organize nodes in a specific topology to enable efficient lookup operations.

Nodes and items have similar m-bit id (hash of the ip for the node and hash of the resource for the element).

Items with key k are stored on the smaller node with id \ge k.

Search can be performed based on the knowledge of each node:

Each node would know the first nodes with an id greater than id_{current} + 2^i.

When a node n joins the network:

Periodically each node perform a check if the nodes are still up.

Each node also know a replication of near nodes, to support failures.

Ultima modifica:
Scritto da: Andrea Lunghi